/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.io.gcp.bigquery;



import com.google.api.services.bigquery.model.TableSchema;

import com.google.cloud.bigquery.storage.v1beta1.Storage.*;

import org.apache.avro.Schema;

import org.apache.avro.generic.GenericDatumReader;

import org.apache.avro.generic.GenericRecord;

import org.apache.avro.io.BinaryDecoder;

import org.apache.avro.io.DatumReader;

import org.apache.avro.io.DecoderFactory;

import com.bff.gaia.unified.sdk.annotations.Experimental;

import com.bff.gaia.unified.sdk.coders.Coder;

import com.bff.gaia.unified.sdk.io.OffsetBasedSource;

import com.bff.gaia.unified.sdk.io.Source;

import com.bff.gaia.unified.sdk.options.PipelineOptions;

import com.bff.gaia.unified.sdk.transforms.SerializableFunction;

import com.bff.gaia.unified.sdk.transforms.display.DisplayData;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.ImmutableList;



import java.io.IOException;

import java.util.Iterator;

import java.util.List;

import java.util.NoSuchElementException;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkNotNull;



/** A {@link Source} representing a single stream in a read session. */

@Experimental(Experimental.Kind.SOURCE_SINK)

public class BigQueryStorageStreamSource<T> extends OffsetBasedSource<T> {



  public static <T> BigQueryStorageStreamSource<T> create(

      ReadSession readSession,

      Stream stream,

      TableSchema tableSchema,

      SerializableFunction<SchemaAndRecord, T> parseFn,

      Coder<T> outputCoder,

      BigQueryServices bqServices) {

    return new BigQueryStorageStreamSource<>(

        readSession,

        stream,

        0L,

        Long.MAX_VALUE,

        1L,

        BigQueryHelpers.toJsonString(checkNotNull(tableSchema, "tableSchema")),

        parseFn,

        outputCoder,

        bqServices);

  }



  private final ReadSession readSession;

  private final Stream stream;

  private final String jsonTableSchema;

  private final SerializableFunction<SchemaAndRecord, T> parseFn;

  private final Coder<T> outputCoder;

  private final BigQueryServices bqServices;



  private BigQueryStorageStreamSource(

      ReadSession readSession,

      Stream stream,

      long startOffset,

      long stopOffset,

      long minBundleSize,

      String jsonTableSchema,

      SerializableFunction<SchemaAndRecord, T> parseFn,

      Coder<T> outputCoder,

      BigQueryServices bqServices) {

    super(startOffset, stopOffset, minBundleSize);

    this.readSession = checkNotNull(readSession, "readSession");

    this.stream = checkNotNull(stream, "stream");

    this.jsonTableSchema = checkNotNull(jsonTableSchema, "jsonTableSchema");

    this.parseFn = checkNotNull(parseFn, "parseFn");

    this.outputCoder = checkNotNull(outputCoder, "outputCoder");

    this.bqServices = checkNotNull(bqServices, "bqServices");

  }



  @Override

  public Coder<T> getOutputCoder() {

    return outputCoder;

  }



  @Override

  public void populateDisplayData(DisplayData.Builder builder) {

    super.populateDisplayData(builder);

    builder

        .addIfNotNull(

            DisplayData.item("table", BigQueryHelpers.toTableSpec(readSession.getTableReference()))

                .withLabel("Table"))

        .add(DisplayData.item("readSession", readSession.getName()).withLabel("Read session"))

        .add(DisplayData.item("stream", stream.getName()).withLabel("Stream"));

  }



  @Override

  public long getEstimatedSizeBytes(PipelineOptions options) {

    // The size of stream source can't be estimated due to server-side liquid sharding.

    // TODO: Implement progress reporting.

    return 0L;

  }



  @Override

  public List<? extends OffsetBasedSource<T>> split(

      long desiredBundleSizeBytes, PipelineOptions options) {

    // A stream source can't be split without reading from it due to server-side liquid sharding.

    // TODO: Implement dynamic work rebalancing.

    return ImmutableList.of(this);

  }



  @Override

  public long getMaxEndOffset(PipelineOptions options) {

    // This method should never be called given the overrides above.

    throw new UnsupportedOperationException("Not implemented");

  }



  @Override

  public OffsetBasedSource<T> createSourceForSubrange(long start, long end) {

    // This method should never be called given the overrides above.

    throw new UnsupportedOperationException("Not implemented");

  }



  @Override

  public BigQueryStorageStreamReader<T> createReader(PipelineOptions options) throws IOException {

    return new BigQueryStorageStreamReader<>(this, options.as(BigQueryOptions.class));

  }



  /** A {@link Source.Reader} which reads records from a stream. */

  @Experimental(Experimental.Kind.SOURCE_SINK)

  public static class BigQueryStorageStreamReader<T> extends OffsetBasedReader<T> {



    private final DatumReader<GenericRecord> datumReader;

    private final SerializableFunction<SchemaAndRecord, T> parseFn;

    private final BigQueryServices.StorageClient storageClient;

    private final TableSchema tableSchema;



    private Iterator<ReadRowsResponse> responseIterator;

    private BinaryDecoder decoder;

    private GenericRecord record;

    private T current;

    private long currentOffset;



    private BigQueryStorageStreamReader(

		BigQueryStorageStreamSource<T> source, BigQueryOptions options) throws IOException {

      super(source);

      this.datumReader =

          new GenericDatumReader<>(

              new Schema.Parser().parse(source.readSession.getAvroSchema().getSchema()));

      this.parseFn = source.parseFn;

      this.storageClient = source.bqServices.getStorageClient(options);

      this.tableSchema = BigQueryHelpers.fromJsonString(source.jsonTableSchema, TableSchema.class);

    }



    @Override

    protected boolean startImpl() throws IOException {

      BigQueryStorageStreamSource<T> source = getCurrentSource();

      currentOffset = source.getStartOffset();



      ReadRowsRequest request =

          ReadRowsRequest.newBuilder()

              .setReadPosition(

                  StreamPosition.newBuilder().setStream(source.stream).setOffset(currentOffset))

              .build();



      responseIterator = storageClient.readRows(request).iterator();

      return readNextRecord();

    }



    @Override

    protected boolean advanceImpl() throws IOException {

      currentOffset++;

      return readNextRecord();

    }



    private boolean readNextRecord() throws IOException {

      while (decoder == null || decoder.isEnd()) {

        if (!responseIterator.hasNext()) {

          return false;

        }



        ReadRowsResponse nextResponse = responseIterator.next();



        decoder =

            DecoderFactory.get()

                .binaryDecoder(

                    nextResponse.getAvroRows().getSerializedBinaryRows().toByteArray(), decoder);

      }



      record = datumReader.read(record, decoder);

      current = parseFn.apply(new SchemaAndRecord(record, tableSchema));

      return true;

    }



    @Override

    public T getCurrent() throws NoSuchElementException {

      return current;

    }



    @Override

    protected long getCurrentOffset() throws NoSuchElementException {

      return currentOffset;

    }



    @Override

    public void close() {

      storageClient.close();

    }



    @Override

    public synchronized BigQueryStorageStreamSource<T> getCurrentSource() {

      return (BigQueryStorageStreamSource<T>) super.getCurrentSource();

    }



    @Override

    public boolean allowsDynamicSplitting() {

      return false;

    }

  }

}